package com.primeton.poctag.task;

import com.primeton.poctag.configure.TaskProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 *
 * 控制 Spark Task 队列
 *
 * <pre>
 *
 * Created by zhaopx.
 * User: zhaopx
 * Date: 2020/11/17
 * Time: 18:04
 *
 * </pre>
 *
 * @author zhaopx
 */
@Component
@Slf4j
public class TaskActivationTask implements Callable<Map<String, Object>>, TaskActivation,
        InitializingBean, DisposableBean {



    @Autowired
    SparkTaskFactory sparkTaskFactory;


    @Autowired
    ThreadPoolTaskExecutor taskExecutor;


    @Autowired
    TaskProperties taskProperties;


    /**
     * 线程完成的句柄
     */
    private Future<Map<String, Object>> taskFuture;


    /**
     * 当前是否正在运行
     */
    private boolean running = true;



    @Override
    public void afterPropertiesSet() throws Exception {
        Data3CSparkQueue.getInstance(taskProperties.getParallel());
        taskFuture = taskExecutor.submit(this);
        log.info("start scan TaskActivationTask thread.");
    }


    @Override
    public void active() {
        try {
            synchronized (this) {
                this.notify();
            }
        } catch (Exception e) {
            log.error("notify TaskActivationTask thread error.", e);
        }
    }

    @Override
    public Map<String, Object> call() throws Exception {
        while (running) {
            // 开始提交任务
            try {
                // 如果待运行的队列非空，且当前运行的并发小于要求的值，则开启任务
                if(Data3CSparkQueue.getPendingTaskSize() > 0 && Data3CSparkQueue.getRunningTaskSize() < taskProperties.getParallel()) {
                    Set<String> pendingTasks = Data3CSparkQueue.getPendingTasks();
                    int count = taskProperties.getParallel() - Data3CSparkQueue.getRunningTaskSize(); // 当前可提交的任务数量

                    // 开始提交 count 个任务
                    int i = 0;
                    for (String taskId : pendingTasks) {
                        i++;
                        Data3CSparkQueue.getInstance().execute(taskExecutor.getThreadPoolExecutor(),
                                sparkTaskFactory.newTask(taskId, (Map)Data3CSparkQueue.getTaskInfo(taskId)));
                        if(i >= count) {
                            break;
                        }
                    }
                }
            } catch (Exception e) {
                log.error("Submit InnerSparkExecution Error!", e);
            }

            try {
                synchronized (this) {
                    // 停止 10s，采用 wait 比 sleep 好, 让出 CPU 时间，使执行其他线程
                    this.wait(TimeUnit.SECONDS.toMillis(30));
                }
            } catch (InterruptedException e) {
                log.error("Interrupted TaskActivationTask Thread!");
                // 系统中断
                running = false;
            }
        }

        return new HashMap<>();
    }

    @Override
    public void destroy() throws Exception {
        log.warn("destroy spring context, will stop TaskActivationTask thread!");
        running = false;
        active();
        if(taskFuture != null) {
            taskFuture.cancel(true);
        }
    }

}
